{
 "cells": [
  {
   "cell_type": "markdown",
   "source": [
    "## A basic notebook to run the pipeline defined in `doc_pipeline.py`.\n",
    "\n",
    "By default this runs parts of the pipeline in parallel using threads or processes.\n",
    "\n",
    "To scale processing here look at all the subsequent cells that show how to run on \n",
    " ray or dask. For spark see spark/notebook.ipynb."
   ],
   "metadata": {
    "collapsed": false
   },
   "id": "c174ce5a23eed9a1"
  },
  {
   "cell_type": "code",
   "outputs": [],
   "source": [
    "import doc_pipeline\n",
    "\n",
    "from hamilton import driver\n",
    "from hamilton.execution import executors\n",
    "\n",
    "dr = (\n",
    "    driver.Builder()\n",
    "    .with_modules(doc_pipeline)\n",
    "    .enable_dynamic_execution(allow_experimental_mode=True)\n",
    "    .with_config({})\n",
    "    .with_local_executor(executors.SynchronousLocalTaskExecutor())\n",
    "    # Choose a backend to process the parallel parts of the pipeline\n",
    "    .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))\n",
    "    # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))\n",
    "    .build()\n",
    ")\n",
    "dag = dr.display_all_functions()\n",
    "result = dr.execute(\n",
    "    [\"collect_chunked_url_text\"],\n",
    "    inputs={\"chunk_size\": 256, \"chunk_overlap\": 32},\n",
    ")\n",
    "# do something with the result...\n",
    "import pprint\n",
    "\n",
    "for chunk in result[\"collect_chunked_url_text\"]:\n",
    "    pprint.pprint(chunk)\n",
    "dag"
   ],
   "metadata": {
    "collapsed": true
   },
   "id": "initial_id",
   "execution_count": 0
  },
  {
   "cell_type": "markdown",
   "source": [
    "# Ray"
   ],
   "metadata": {
    "collapsed": false
   },
   "id": "7bc40e6914aed330"
  },
  {
   "cell_type": "code",
   "outputs": [],
   "source": [
    "import logging\n",
    "\n",
    "import doc_pipeline\n",
    "import ray\n",
    "\n",
    "from hamilton import driver, log_setup\n",
    "from hamilton.plugins import h_ray\n",
    "\n",
    "log_setup.setup_logging(logging.INFO)\n",
    "ray.init()\n",
    "\n",
    "dr = (\n",
    "    driver.Builder()\n",
    "    .with_modules(doc_pipeline)\n",
    "    .enable_dynamic_execution(allow_experimental_mode=True)\n",
    "    .with_config({})\n",
    "    # Choose a backend to process the parallel parts of the pipeline\n",
    "    # .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))\n",
    "    # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))\n",
    "    .with_remote_executor(\n",
    "        h_ray.RayTaskExecutor()\n",
    "    )  # be sure to run ray.init() or pass in config.\n",
    "    .build()\n",
    ")\n",
    "dag = dr.display_all_functions()\n",
    "result = dr.execute(\n",
    "    [\"collect_chunked_url_text\"],\n",
    "    inputs={\"chunk_size\": 256, \"chunk_overlap\": 32},\n",
    ")\n",
    "# do something with the result...\n",
    "import pprint\n",
    "\n",
    "for chunk in result[\"collect_chunked_url_text\"]:\n",
    "    pprint.pprint(chunk)\n",
    "\n",
    "ray.shutdown()\n",
    "dag"
   ],
   "metadata": {
    "collapsed": false
   },
   "id": "a4df6e50283f68ab"
  },
  {
   "cell_type": "markdown",
   "source": [
    "# Dask"
   ],
   "metadata": {
    "collapsed": false
   },
   "id": "46aa4763a337dcb1"
  },
  {
   "cell_type": "code",
   "outputs": [],
   "source": [
    "import logging\n",
    "\n",
    "import doc_pipeline\n",
    "from dask import distributed\n",
    "\n",
    "from hamilton import driver, log_setup\n",
    "from hamilton.plugins import h_dask\n",
    "\n",
    "log_setup.setup_logging(logging.INFO)\n",
    "\n",
    "cluster = distributed.LocalCluster()\n",
    "client = distributed.Client(cluster)\n",
    "remote_executor = h_dask.DaskExecutor(client=client)\n",
    "\n",
    "dr = (\n",
    "    driver.Builder()\n",
    "    .with_modules(doc_pipeline)\n",
    "    .enable_dynamic_execution(allow_experimental_mode=True)\n",
    "    .with_config({})\n",
    "    # Choose a backend to process the parallel parts of the pipeline\n",
    "    # .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))\n",
    "    # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))\n",
    "    .with_remote_executor(h_dask.DaskExecutor(client=client))\n",
    "    .build()\n",
    ")\n",
    "dag = dr.display_all_functions()\n",
    "result = dr.execute(\n",
    "    [\"collect_chunked_url_text\"],\n",
    "    inputs={\"chunk_size\": 256, \"chunk_overlap\": 32},\n",
    ")\n",
    "# do something with the result...\n",
    "import pprint\n",
    "\n",
    "for chunk in result[\"collect_chunked_url_text\"]:\n",
    "    pprint.pprint(chunk)\n",
    "\n",
    "client.shutdown()\n",
    "dag"
   ],
   "metadata": {
    "collapsed": false
   },
   "id": "103824eec22810fe"
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
